Skip to main content

compio_driver\sys\driver\iocp/
mod.rs

1use std::{
2    collections::HashMap, marker::PhantomData, os::windows::io::AsRawHandle, sync::Arc,
3    time::Duration,
4};
5
6use flume::{Receiver, Sender};
7use windows_sys::Win32::{Foundation::ERROR_OPERATION_ABORTED, System::IO::OVERLAPPED};
8
9use crate::{
10    AsyncifyPool, DriverType, Entry, ErasedKey, ProactorBuilder,
11    control::Carrier,
12    sys::{driver::AwakeFlag, extra::IocpExtra, prelude::*},
13};
14
15mod cp;
16mod wait;
17
18mod_use![op];
19
20/// Operation type.
21pub enum OpType {
22    /// An overlapped operation.
23    Overlapped,
24    /// A blocking operation, needs a thread to spawn. The `operate` method
25    /// should be thread safe.
26    Blocking,
27    /// A Win32 event object to be waited. The user should ensure that the
28    /// handle is valid till operation completes. The `operate` method should be
29    /// thread safe.
30    Event(RawFd),
31}
32
33/// Low-level driver of IOCP.
34pub(crate) struct Driver {
35    notify: Arc<Notify>,
36    waits: HashMap<usize, wait::Wait>,
37    pool: AsyncifyPool,
38    completed_tx: Sender<Entry>,
39    completed_rx: Receiver<Entry>,
40    _local_marker: PhantomData<ErasedKey>,
41}
42
43impl Driver {
44    pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
45        instrument!(compio_log::Level::TRACE, "new", ?builder);
46
47        let port = cp::Port::new()?;
48        let driver = port.as_raw_handle() as _;
49        let overlapped = Overlapped::new(driver);
50        let notify = Arc::new(Notify::new(port, overlapped));
51        let (completed_tx, completed_rx) = flume::unbounded();
52
53        Ok(Self {
54            notify,
55            completed_tx,
56            completed_rx,
57            waits: HashMap::default(),
58            pool: builder.create_or_get_thread_pool(),
59            _local_marker: PhantomData,
60        })
61    }
62
63    pub fn driver_type(&self) -> DriverType {
64        DriverType::IOCP
65    }
66
67    fn port(&self) -> &cp::Port {
68        &self.notify.port
69    }
70
71    pub(in crate::sys) fn default_extra(&self) -> IocpExtra {
72        IocpExtra::new(self.port().as_raw_handle() as _)
73    }
74
75    pub fn attach(&mut self, fd: RawFd) -> io::Result<()> {
76        self.port().attach(fd)
77    }
78
79    pub fn cancel(&mut self, key: ErasedKey) {
80        instrument!(compio_log::Level::TRACE, "cancel", ?key);
81        trace!("cancel RawOp");
82        let optr = key.borrow().extra_mut().optr();
83        if let Some(w) = self.waits.get_mut(&key.as_raw())
84            && w.cancel().is_ok()
85        {
86            // The pack has been cancelled successfully, which means no packet will be post
87            // to IOCP. Need not set the result because `create_entry` handles it.
88            self.port().post_raw(optr).ok();
89        }
90        trace!("call OpCode::cancel");
91        // It's OK to fail to cancel.
92        key.borrow().carrier.cancel(optr.cast()).ok();
93    }
94
95    pub fn push(&mut self, key: ErasedKey) -> Poll<io::Result<usize>> {
96        instrument!(compio_log::Level::TRACE, "push", ?key);
97        trace!("push RawOp");
98        let mut op = key.borrow();
99        let optr = op.extra_mut().optr();
100        let op_type = op.carrier.op_type();
101        match op_type {
102            OpType::Overlapped => unsafe {
103                let res = op.carrier.operate(optr.cast());
104                drop(op);
105                if res.is_pending() {
106                    key.into_raw();
107                }
108                res
109            },
110            OpType::Blocking => {
111                drop(op);
112                self.push_blocking(key);
113                Poll::Pending
114            }
115            OpType::Event(e) => {
116                drop(op);
117                self.waits
118                    .insert(key.as_raw(), wait::Wait::new(self.notify.clone(), e, key)?);
119                Poll::Pending
120            }
121        }
122    }
123
124    fn push_blocking(&mut self, key: ErasedKey) {
125        let notify = self.notify.clone();
126        let tx = self.completed_tx.clone();
127
128        // SAFETY: we're submitting into the driver, so it's safe to freeze here.
129        let mut key = unsafe { key.freeze() };
130
131        let mut closure = move || {
132            let res = key.as_mut().operate_blocking();
133            let entry = Entry::new(key.into_inner(), res);
134            _ = tx.send(entry);
135            notify.wake();
136        };
137
138        while let Err(e) = self.pool.dispatch(closure) {
139            closure = e.0;
140            std::thread::yield_now();
141        }
142    }
143
144    pub fn flush(&mut self) -> bool {
145        self.notify.reset()
146    }
147
148    fn create_entry(
149        notify: *const Overlapped,
150        waits: &mut HashMap<usize, wait::Wait>,
151        entry: cp::RawEntry,
152    ) -> Option<Entry> {
153        if entry.overlapped.cast_const() == notify {
154            return None;
155        }
156
157        let entry = Entry::new(
158            unsafe { ErasedKey::from_optr(entry.overlapped) },
159            entry.result,
160        );
161
162        // if there's no wait, just return the entry
163        let Some(w) = waits.remove(&entry.user_data()) else {
164            return Some(entry);
165        };
166
167        let entry = if w.is_cancelled() {
168            Entry::new(
169                entry.into_key(),
170                Err(io::Error::from_raw_os_error(ERROR_OPERATION_ABORTED as _)),
171            )
172        } else if entry.result.is_err() {
173            entry
174        } else {
175            let key = entry.into_key();
176            let result = key.borrow().operate_blocking();
177            Entry::new(key, result)
178        };
179
180        Some(entry)
181    }
182
183    pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
184        instrument!(compio_log::Level::TRACE, "poll", ?timeout);
185
186        let notify = &self.notify.overlapped as *const Overlapped;
187
188        let mut has_entry = false;
189        while let Ok(entry) = self.completed_rx.try_recv() {
190            entry.notify();
191            has_entry = true;
192        }
193        if self.notify.reset() {
194            has_entry = true;
195        }
196
197        if !has_entry {
198            for e in self.notify.port.poll(timeout)? {
199                if let Some(e) = Self::create_entry(notify, &mut self.waits, e) {
200                    self.notify.set_awake();
201                    e.notify()
202                }
203            }
204        }
205        self.notify.set_awake();
206
207        Ok(())
208    }
209
210    pub fn waker(&self) -> Waker {
211        Waker::from(self.notify.clone())
212    }
213
214    pub fn pop_multishot(&mut self, _: &ErasedKey) -> Option<BufResult<usize, crate::sys::Extra>> {
215        None
216    }
217}
218
219impl AsRawFd for Driver {
220    fn as_raw_fd(&self) -> RawFd {
221        self.port().as_raw_handle() as _
222    }
223}
224
225/// A notify handle to the inner driver.
226pub(crate) struct Notify {
227    port: cp::Port,
228    overlapped: Overlapped,
229    awake: AwakeFlag,
230}
231
232impl Notify {
233    fn new(port: cp::Port, overlapped: Overlapped) -> Self {
234        Self {
235            port,
236            overlapped,
237            awake: AwakeFlag::new(),
238        }
239    }
240
241    fn set_awake(&self) {
242        self.awake.set();
243    }
244
245    fn reset(&self) -> bool {
246        self.awake.reset()
247    }
248}
249
250impl Wake for Notify {
251    fn wake(self: Arc<Self>) {
252        self.wake_by_ref();
253    }
254
255    fn wake_by_ref(self: &Arc<Self>) {
256        if !self.awake.wake() {
257            self.port.post_raw(&self.overlapped).ok();
258        }
259    }
260}